#include "config.h"

#include <sys/mman.h>
#include <libaio.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "replica.h"
#include "job_dock.h"
#include "sysy_lib.h"
#include "cluster.h"
#include "fileinfo.h"
#include "disk.h"
#include "squeue.h"
#include "core.h"
#include "bh.h"
#include "main_loop.h"
#include "timer.h"
#include "net_global.h"
#include "bmap.h"
#include "dbg.h"
#include "../../storage/controller/ramdisk.h"

typedef struct {
        chkid_t chkid;
        task_t *task;
        buffer_t *buf;
        int offset;
        int size;
        uint64_t clock;
        nid_t reader;
} replica_srv_ctx_read_t;

typedef replica_srv_entry_t entry_t;

STATIC int IO_FUNC __replica_read_check(entry_t *ent, const nid_t *reader,
                                        const io_t *io, const vclock_t *vclock)
{
        int ret;

#if 1
        if (unlikely((io->lease != -1) && (io->lease != ent->token.seq))) {
                DWARN("chunk "CHKID_FORMAT" owner %s:%s, lease %x:%x\n",
                      CHKID_ARG(&ent->chkid), network_rname(&ent->owner),
                      network_rname(reader), ent->token.seq, io->lease);
                ret = EPERM;
                GOTO(err_ret, ret);
        }
#endif

#if 1
        if (unlikely(nid_cmp(&ent->owner, reader))) {
                ret = EPERM;
                DINFO("chunk "CHKID_FORMAT" owner %s reader %s\n",
                      CHKID_ARG(&ent->chkid),
                      network_rname(&ent->owner),
                      network_rname(reader));
                GOTO(err_ret, ret);
        }
#endif

        if (unlikely(ent->vclock.clock < vclock->clock)) {
                DBUG("read "CHKID_FORMAT" clock %llu:%llu\n",
                     CHKID_ARG(&ent->chkid),
                     (LLU)vclock->clock, (LLU)ent->vclock.clock);
                ret = EAGAIN;
                goto err_ret;
        }

        return 0;
err_ret:
        return ret;
}

STATIC int IO_FUNC __replica_read_commit__(const char *pool, const diskloc_t *loc, const io_t *io, buffer_t *buf)
{
        int ret, prio;

        (void) pool;
        
        ret = mbuffer_init(buf, io->size);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        prio = io->id.type == __RAW_CHUNK__ ? 0 : 1;

        DBUG("read "CHKID_FORMAT" prio %u\n", CHKID_ARG(&io->id), prio);

        ret = diskmd_aio_read(&io->id, loc, buf, io->offset, prio);
        if (unlikely(ret))
                GOTO(err_free, ret);

#if RAMDISK_ENABLE
        buffer_t cmp;

        mbuffer_init(&cmp, 0);
        ret = ramdisk_read(&io->id, &cmp, io->size, io->offset);
        if (unlikely(ret))
                GOTO(err_free1, ret);

        DWARN("replica read "CHKID_FORMAT" offset:%ld size:%d clock:%ld loc disk %d idx %d\n",
                        CHKID_ARG(&io->id), io->offset, io->size, io->vclock.clock, loc->diskid, loc->idx);

        if (mbuffer_compare(&cmp, buf)) {
                DWARN("read raw "CHKID_FORMAT" size %llu(%d,%d) off %llu not match with ramdisk\n",
                                CHKID_ARG(&io->id), (LLU)io->size, buf->len, cmp.len, (LLU)io->offset);
        }

        mbuffer_free(&cmp);
#endif

#if ISCSI_IO_RECORD
        char mem[MAX_INFO_LEN];

        sprintf(mem, "replica_read_commit "CHKID_FORMAT" (%llu, %llu)",
                        CHKID_ARG(&io->id),
                        (LLU)io->offset, (LLU)io->size);

        mbuffer_dump(buf, 8, mem);
#endif

        return 0;
#if RAMDISK_ENABLE
err_free1:
        mbuffer_free(&cmp);
#endif
err_free:
        mbuffer_free(buf);
err_ret:
        return ret;
}

STATIC int IO_FUNC __replica_read_reg(const nid_t *reader, mcache_entry_t *cent,
                const io_t *io, wlist_t *wlist, diskloc_t *loc, entry_t **_ent)
{
        int ret;
        entry_t *ent;

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent = cent->value;
        ret = __replica_read_check(ent, reader, io, &wlist->vclock);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        goto err_lock;
                } else {
                        GOTO(err_lock, ret);
                }
        }

        *loc = ent->loc;
        *_ent = ent;

        __replica_srv_wqadd(ent, wlist);

        mcache_unlock(cent);

        return 0;
err_lock:
        mcache_unlock(cent);
err_ret:
        return ret;
}

STATIC int __replica_read_cleanup(mcache_entry_t *cent, wlist_t *wlist)
{
        int ret;

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_del(&wlist->hook);

        mcache_unlock(cent);

        return 0;
err_ret:
        EXIT(EAGAIN);
        return ret;
}

STATIC int IO_FUNC __replica_read_commit(const nid_t *reader, const io_t *io, buffer_t *buf)
{
        int ret, retval;
        mcache_entry_t *cent;
        entry_t *ent;
        wlist_t wlist;
        diskloc_t loc;

        DBUG("replica_srv read "CHKID_FORMAT" size %u offset %llu \n",
             CHKID_ARG(&io->id), io->size, (LLU)io->offset);

        ret = replica_srv_get(&io->id, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        wlist.op = __OP_READING__;
        wlist.vclock = io->vclock;
        wlist.magic = -1;
        wlist.ltime = -1;
        wlist.begin = gettime();
        wlist.task.taskid = -1;

        ret = __replica_read_reg(reader, cent, io, &wlist, &loc, &ent);
        if (unlikely(ret)) {
                if (ret == EAGAIN)
                        goto err_release;
                else
                        GOTO(err_release, ret);
        }

        ret = __replica_read_commit__(ent->pool, &loc, io, buf);
        if (unlikely(ret)) {
                retval = __replica_read_cleanup(cent, &wlist);
                if (retval)
                        GOTO(err_release, retval);

                GOTO(err_release, ret);
        }

        ret = __replica_read_cleanup(cent, &wlist);
        if (unlikely(ret))
                GOTO(err_release, ret);

        replica_srv_release(cent);

        return 0;
err_release:
        replica_srv_release(cent);
err_ret:
        return ret;
}

typedef struct {
        task_t task;
        vclock_t vclock;
        uint32_t magic;
        time_t ltime;
        chkid_t chkid;
        nid_t reader;
} arg_t;

STATIC void __replica_read_wait_check__(void *_arg)
{
        int ret;
        mcache_entry_t *cent;
        arg_t *arg = _arg;
        chkid_t *chkid = &arg->chkid;
        wlist_t *wlist;
        time_t ltime;
        struct list_head *pos, *n;
        entry_t *ent;

        DINFO("core[%d][%d] read "CHKID_FORMAT" clock %llu check\n",
                        arg->task.scheduleid, arg->task.taskid,
                        CHKID_ARG(chkid), (LLU)arg->vclock.clock);

        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;

        ret = sy_spin_lock(&ent->wq.lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
        
        list_for_each_safe(pos, n, &ent->wq.wlist) {
                wlist = (void *)pos;
                if (arg->vclock.clock == wlist->vclock.clock
                    && __OP_READ_WAIT__ == wlist->op
                    && arg->magic == -1
                    && arg->ltime == wlist->ltime) {

                        ret = network_connect(&arg->reader, &ltime, 1, 0);
                        if (ret || ltime != arg->ltime) {
                                DERROR("core[%d][%d] read "CHKID_FORMAT" clock %llu ret %d reset\n",
                                                wlist->task.scheduleid, wlist->task.taskid,
                                                CHKID_ARG(chkid), (LLU)arg->vclock.clock, ret);
                                list_del_init(&wlist->hook);
                                schedule_resume(&wlist->task, ETIME, NULL);
                                break;
                        }

                        if (gettime() - wlist->begin > gloconf.lease_timeout / 2) {
                                DERROR("core[%d][%d] read "CHKID_FORMAT" clock %llu begin %ld timeout\n",
                                                wlist->task.scheduleid, wlist->task.taskid,
                                                CHKID_ARG(chkid), (LLU)arg->vclock.clock, wlist->begin);
                                list_del_init(&wlist->hook);
                                schedule_resume(&wlist->task, ETIME, NULL);
                                break;
                        }
                }
        }

        sy_spin_unlock(&ent->wq.lock);
        
        mcache_unlock(cent);
        replica_srv_release(cent);
        mem_cache_free(MEM_CACHE_4K, _arg);

        
        return;
err_release:
        replica_srv_release(cent);
err_ret:
        mem_cache_free(MEM_CACHE_4K, _arg);
        return;
}

STATIC void __replica_read_wait_check(void *_args)
{
        wlist_t *wlist = _args;
        arg_t *arg = mem_cache_calloc1(MEM_CACHE_4K, 0);

        arg->task = wlist->task;
        arg->chkid = wlist->chkid;
        arg->ltime = wlist->ltime;
        arg->magic = wlist->magic;
        arg->reader = wlist->writer;
        arg->vclock = wlist->vclock;

        schedule_task_new("replica_wait_check", __replica_read_wait_check__, arg, -1);
}

STATIC int __replica_read_wait(const nid_t *reader, const io_t *io, time_t ltime, int retry)
{
        int ret, wait;
        wlist_t wlist;
        mcache_entry_t *cent;
        entry_t *ent;
        char name[MAX_NAME_LEN];

        ret = replica_srv_get(&io->id, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;

        ret = __replica_read_check(ent, reader, io, &io->vclock);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        if (schedule_running()) {
                                wlist.vclock = io->vclock;
                                wlist.chkid = io->id;
                                wlist.writer = *reader;
                                wlist.op = __OP_READ_WAIT__;
                                wlist.magic = -1;
                                wlist.ltime = ltime;
                                wlist.begin = gettime();
                                wlist.task = schedule_task_get();

                                __replica_srv_wqadd(ent, &wlist);
                                wait = 1;
                        } else {
                                GOTO(err_lock, ret);
                        }
                } else
                        GOTO(err_lock, ret);
        } else {
                wait = 0;
        }

        mcache_unlock(cent);
        replica_srv_release(cent);

        if (wait) {
                DBUG("read "CHKID_FORMAT" clock %llu wait, retry %u\n",
                     CHKID_ARG(&io->id), (LLU)io->vclock.clock, retry);

                sprintf(name, "read_wait_clock "CHKID_FORMAT" %ld:%ld",
                                CHKID_ARG(&io->id), io->vclock.clock, ent->vclock.clock);
                ret = schedule_yield1(name, NULL, &wlist,
                                      __replica_read_wait_check, 2);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                DBUG("read "CHKID_FORMAT" clock %llu resume, retry %u\n",
                     CHKID_ARG(&io->id), (LLU)io->vclock.clock, retry);
        }

        return 0;
err_lock:
        mcache_unlock(cent);
err_release:
        replica_srv_release(cent);
err_ret:
        return ret;
}

STATIC int IO_FUNC __replica_srv_read(const nid_t *reader, const io_t *io, buffer_t *buf)
{
        int ret, retry = 0;
        time_t ltime;

        ret = network_connect(reader, &ltime, 1, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        if (unlikely(io->id.type != __RAW_CHUNK__)) {
                DBUG("chunk "CHKID_FORMAT" offset %llu size %u, clock %llu, prep\n",
                      CHKID_ARG(&io->id), (LLU)io->offset, io->size, (LLU)io->vclock.clock);
        }

        YASSERT(io->id.id != CHKID_NULL);

        if (unlikely(io->size == 0)) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

retry:
        ret = __replica_read_commit(reader, io, buf);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        ret = __replica_read_wait(reader, io, ltime, retry);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        retry++;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        ret = (ret == ENODEV) ? EAGAIN : ret;
        return ret;
}

STATIC int __replica_srv_read_request(va_list ap)
{
        const nid_t *reader = va_arg(ap, nid_t *);
        const io_t *io = va_arg(ap, io_t *);
        buffer_t *buf = va_arg(ap, buffer_t *);

        va_end(ap);

        return __replica_srv_read(reader, io, buf);
}

int IO_FUNC replica_srv_read(const nid_t *reader, const io_t *io, buffer_t *buf)
{
        int ret;

#if ENABLE_CHUNK_DEBUG
        DINFO("chunk "CHKID_FORMAT" offset %llu size %u, clock %llu\n",
             CHKID_ARG(&io->id), (LLU)io->offset, io->size, (LLU)io->vclock.clock);
#else
        DBUG("chunk "CHKID_FORMAT" offset %llu size %u, clock %llu\n",
             CHKID_ARG(&io->id), (LLU)io->offset, io->size, (LLU)io->vclock.clock);
#endif
        
        ANALYSIS_BEGIN(0);

        if (likely(io->id.type == __RAW_CHUNK__)) {
                ret = __replica_srv_read(reader, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
                ANALYSIS_QUEUE(0, IO_WARN, "replica_srv_read");
        } else {
                ret = core_request(core_hash(&io->id), -1, "replica_read", __replica_srv_read_request,
                                   reader, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
                ANALYSIS_QUEUE(0, IO_WARN, "replica_srv_readmd");
        }

        return 0;
err_ret:
        return ret;
}

int replica_srv_sha1(const chkid_t *chkid, uint64_t version, char *buf)
{
        int ret;
        mcache_entry_t *cent;
        entry_t *ent;

        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;

        if (version > ent->vclock.clock) {
                DWARN("check "CHKID_FORMAT" version %llu:%llu\n", CHKID_ARG(chkid),
                      (LLU)version, (LLU)ent->vclock.clock);

                ret = EAGAIN;
                goto err_lock;
        }

        mcache_unlock(cent);
        replica_srv_release(cent);

        ret = disk_sha1(&ent->loc, buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_lock:
        mcache_unlock(cent);
err_release:
        replica_srv_release(cent);
err_ret:
        return ret;
}
